package com.xiaohu.sink;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;

/*
    自定义kafka序列化器
 */
public class KafkaSinkWithKey {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
//        env.setStateBackend(new EmbeddedRocksDBStateBackend()); //设置状态后端

        SingleOutputStreamOperator<String> ds1 = env.socketTextStream("master", 7777);

        ds1.print("收到数据：");
        /*
            精确一次的额外设置：
                1、开启checkpoint
                2、设置事务前缀
                3、设置事务超时时间 checkpoint时间间隔 < 事务超时时间 < 15min
         */
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                .setBootstrapServers("master:9092,node1:9092,node2:9092") //设置kafka节点，生产环境多指定几个
                //.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                //精准一次 = 至少一次 + 幂等性
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) //一致性级别，设置至少一次还是精准一次
                //如果是精准一次，必须设置事务超时时间：大于enableCheckpointing的时间间隔，小于 15分钟
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"600000")
                .setProperty("auto.create.topics.enable","true") // 设置自动创建topic
                //如果是精准一次，会启动kafka事务，设置该配置以及开启checkpoint，指定事务前缀
                .setTransactionalIdPrefix("xiaohu-")
                .setRecordSerializer( // kafka的生产数据和接收数据使用的是序列化和反序列化，这里是发送是序列化
//                        KafkaRecordSerializationSchema.<String>builder()
//                                .setTopic("wc") // 指定发送的topic
//                                .setValueSerializationSchema(new SimpleStringSchema()) //指定value序列化器
//                                .build())
                        //TODO：使用自定义的序列化类
                        new KafkaRecordSerializationSchema<String>() {
                            @Nullable
                            @Override
                            public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
                                String[] datas = element.split(",");
                                byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
                                byte[] value = element.getBytes(StandardCharsets.UTF_8);
                                return new ProducerRecord<>("wc",key,value);
                            }
                        })
                .build();


        ds1.sinkTo(kafkaSink);

        env.execute();
    }
}
